package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.HBaseUtil;
import com.atguigu.utils.JedisUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Field;

public class DimMapFunction2<T> extends RichMapFunction<T, T> {

    private Connection connection;
    private Jedis jedis;
    private String tableName;
    private String keyColumn;

    public DimMapFunction2(String tableName) {
        this.tableName = tableName;
        this.keyColumn = keyColumn;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = HBaseUtil.getConnection();
        jedis = JedisUtil.getJedis();
    }

    @Override
    public T map(T t) throws Exception {

        //通过反射的方式获取主键数据
        Class<?> valueClass = t.getClass();
        Field field = valueClass.getField(keyColumn);
        Object value = field.get(t);

        JSONObject dimInfo = DimInfoFunction.getDimInfo(jedis, connection, tableName, value.toString());

        return t;
    }

    @Override
    public void close() throws Exception {
        connection.close();
        jedis.close();
    }
}
